In [ ]:
from __future__ import print_function
In [ ]:
# All numbers from 0 to 1000. Split in 4 partitions
numbers = sc.parallelize( range(0,1001), 4 )
print(numbers.getNumPartitions())
print(numbers.count())
print(numbers.take(10))
We want to compute $\sum_{i=0}^{499} cos(2i+1) $ using the numbers RDD we have just created. We are going to make it a bit convoluted, just to be able to chain transformations; in practice we could do it in a more direct way.
First we start by taking only the odd numbers: the list of odd numbers from 0 to 1000 is the same as the list of $(2i+1)$ when $ i \in [0,499] $
In [ ]:
# Transformation: take only the odd numbers
odd = numbers.filter( lambda x : x % 2 )
odd.take(10) # action
Now we compute the cosine of each number. We could use a map with
lambda x : cos(x)
but in this case, since it's just calling a function, we use the function directly:
In [ ]:
# Transformation: compute the cosine of each number
from math import cos
odd_cosine = odd.map( cos )
odd_cosine.take(10) # action
Finally we sum all values. Again, we could use a lambda function such as
lambda a,b : a+b
but Python already defines the "sum" function for us, so we just use it.
Note this is an action, therefore is the one that triggers the stage computation; the previous transformations didn't produce results (that's in theory, in practive since we executed take, we forced realization of the operations)
In [ ]:
# Action: sum all values
from operator import add
result = odd_cosine.reduce( add )
print(result)
In [ ]:
a = sc.parallelize( range(20), 4 )
Now we do a classic map+reduce to sum its squared values:
In [ ]:
b1 = a.map( lambda x : x*x )
In [ ]:
from operator import add
result1 = b1.reduce( add )
print(result1)
Now we try with flatMap. First let's do it wrong:
In [ ]:
b2 = a.flatMap( lambda x : x*x )
# This will trigger an error
b2.take(1)
Let's do it right: flatMap must produce a list. Even if it's a list of 1 element (or 0)
In [ ]:
# Ensure flatMap returns a list, even if it's a list of 1
b2 = a.flatMap( lambda x : [x*x] )
In [ ]:
result2 = b2.reduce( add )
print(result2)
result2 == result1
So, why should we use flatMap? Because we can create several rows (including zero) out of each input RDD rows
In [ ]:
b2b = a.flatMap( lambda x : [x, x*x] )
In [ ]:
b2b.take(6)
We repeat the same operation as above, but using mapPartitions. This time is different: our function will not receive an element, but a whole partition (actually an iterator over its elements). We must iterate over it and return another iterator over the result of our computation.
Admittedly, to use mapPartitions for this operation does not make much sense. But in general it might be handy to have access in our function to all the elements in a partition.
In [ ]:
# In Python, the easiest way of returning an iterator is by creating
# a generator function via yield
def mapper( it ):
for n in it:
yield n*n
# Now we have the function, let's use it
b3 = a.mapPartitions( mapper )
result3 = b3.reduce( add )
print(result3)
result3 == result1
For a final twist, mapPartitionsWithIndex works the same as mapPartitions, but our function will receive two arguments: the iterator over the elements of the partition, as before, and the index of the partition, i.e. an integer in $[0,numPartitions)$. So we can know which partition we are in when processing its elements.
In [ ]:
# In Python, the easiest way of returning an iterator is by creating
# a generator function via yield
def mapper( partitionIndex, it ):
for n in it:
yield n*n
# Now we have the function, let's use it
b4 = a.mapPartitionsWithIndex( mapper )
result4 = b4.reduce( add )
print(result4)
result4 == result1
... though in this case we have not used the index, it might be useful for certain tasks.